Skip to content

[TRTLLM-12499][feat] (WIP) Add support for pipelined KVCache transfer for disaggregated serving in Python Cache Transceiver#15727

Open
athena-nv wants to merge 10 commits into
NVIDIA:mainfrom
athena-nv:trtllm-12499-pipelined-kvcache-transfer
Open

[TRTLLM-12499][feat] (WIP) Add support for pipelined KVCache transfer for disaggregated serving in Python Cache Transceiver#15727
athena-nv wants to merge 10 commits into
NVIDIA:mainfrom
athena-nv:trtllm-12499-pipelined-kvcache-transfer

Conversation

@athena-nv

@athena-nv athena-nv commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Description

Summary

Implements pipelined prefill-transfer for disaggregated serving. Instead of waiting for all prefill chunks to complete before starting KV cache transfer, each chunk's KV data is transferred to the generation server immediately after its prefill completes. This overlaps GPU compute with RDMA transfer, hiding transfer latency behind prefill computation. When transfer time per chunk is less than prefill time per chunk (typical for 100+ Gbps NIC with long-context workloads), transfer latency is nearly fully hidden.

TODO: fix _create_kv_slices and add e2e and accuracy tests

Configuration

Enable chunked KV cache transfer by setting chunk_size_blocks in the YAML config:

# Context server config
context_servers:
  cache_transceiver_config:
    backend: "DEFAULT"          # or "NIXL"
    chunk_size_blocks: 64       # max blocks per layer group per chunk
    enable_pipelined_transfer: true
enable_chunked_prefill: true

# Generation server config (same setting)
generation_servers:
  cache_transceiver_config:
    backend: "DEFAULT"
    chunk_size_blocks: 64
    enable_pipelined_transfer: true
enable_chunked_prefill: true

Phased Roadmap

This PR is Phase 1 of a 2-phase effort:

Phase What Transceiver PR Status
1 Chunked transfer Python #15519 In review
2 Pipelined prefill-transfer Python This PR Prototype

chienchunhung and others added 9 commits June 25, 2026 18:26
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
… counter

- Replace _collect_block_ids with _collect_base_slice to preserve the
  full KVSlice metadata (including mamba_state_index) through all new
  code paths: _create_kv_slices (sender) and request_and_receive_async
  (receiver).  Without this, Mamba/hybrid-state model transfers would
  lose required state metadata.

- Fix VSWA shared counter bug in WindowBlockManager::releasePrefixBlocks:
  snapshot mNumFrontBlocksRemoved before iterating window managers so
  each manager releases blocks from the same range.  Previously the
  first manager advanced the shared counter, causing subsequent managers
  to skip their own blocks entirely.

- Guard chunking integrity assertion with __debug__ to avoid O(N) CPU
  overhead on the hot path in optimized builds.

- Add tests for mamba_state_index propagation through chunked slices.

Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
- Update copyright year to 2026 in nanobind kvCacheManager.cpp
- Add OnChunkTransferredCallback type alias for precise callback typing
- Add strict=True to zip() calls in chunked transfer tests

Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
- Fix chunking integrity check: use np.array_equal() instead of ==
  for numpy array comparison, raise ValueError instead of assert
  (eopXD comment on transceiver.py)

- Add explicit VSWA limitation comment in BlockManager::releasePrefixBlocks
  documenting the single-window-size assumption
  (eopXD comment on kvCacheManager.cpp)

- Auto-select Python transceiver when chunk_size_blocks is set and
  backend is NIXL/DEFAULT. The C++ transceiver does not support
  chunked transfer; this makes chunking work without requiring
  users to manually set transceiver_runtime="PYTHON"
  (pcastonguay comment on transceiver.py)

Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
Per reviewer feedback (chuangz0, Shixiaowei02): chunk_block_offset
belongs as a member of KVSlice rather than a function parameter on
send(). The KVSlice dataclass was designed to carry all slice metadata.

- Add chunk_block_offset: int = 0 to KVSlice dataclass
- Remove chunk_block_offset from TxSessionBase.send() signature
- Remove chunk_block_offset from TxSession.send() signature
- Remove chunk_block_offset from KVSendTask.__init__
- Read chunk_block_offset from task._slice in _build_kv_write_meta
  and _deliver_kv_to_agent callback
- Set chunk_block_offset on each KVSlice in _create_kv_slices
- Update all tests accordingly

Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Made-with: Cursor
Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
Signed-off-by: Athena Cai <athenac@nvidia.com>
Signed-off-by: Athena Cai <athenac@nvidia.com>
Signed-off-by: Athena Cai <athenac@nvidia.com>
…isaggregated serving in Python Cache Transceiver

Credit: the design and much of the implementation in this PR comes from NVIDIA#12781 by @chienchunhung

TODO: fix _create_kv_slices and add e2e and accuracy tests

Signed-off-by: Athena Cai <athenac@nvidia.com>
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Adds chunked and pipelined KV cache transfer to disaggregated serving. KVSlice gains chunk_block_offset and cuda_event fields. KvCacheTransceiverV2 gains slice-chunking helpers, a send_prefill_chunk method, and pipelining logic. The executor dispatches prefill chunks as they complete. CacheTransceiverConfig exposes the new knobs, and the transceiver factory auto-selects the Python backend when chunking is configured.

Changes

Chunked & Pipelined KV Transfer

Layer / File(s) Summary
KVSlice and config contracts
tensorrt_llm/_torch/disaggregation/base/transfer.py, tensorrt_llm/llmapi/llm_args.py, tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py
KVSlice gains chunk_block_offset and cuda_event fields with updated TxSessionBase.send docstring. CacheTransceiverConfig adds chunk_size_blocks and enable_pipelined_transfer (excluded from _to_pybind). KvCacheTransceiver base adds enable_pipelined_transfer defaulting to False.
Transceiver chunking and pipelining
tensorrt_llm/_torch/disaggregation/transceiver.py
Initializes _chunk_size_blocks, _enable_pipelined_transfer, _pipelined_chunk_offsets in KvCacheTransceiverV2. Adds _collect_base_slice/_create_kv_slices for chunk splitting with integrity checks. Implements send_prefill_chunk and updates respond_and_send_async to skip re-send when pipelined chunks are already complete. Updates receiver path to use _collect_base_slice with is_last_slice=True.
Native RDMA chunking and session status
tensorrt_llm/_torch/disaggregation/native/transfer.py
_deliver_kv_to_agent synchronizes cuda_event before RDMA. _build_kv_write_meta slices dst block IDs per chunk offset. Success-path KV result always reports receiver_slice_id=0 and uses is_last_slice for completion. TxSession.status adds ERROR checks for exceptions and failed tasks. RxSession.process_aux_agent_result uses the last KV task for expected-transfer counting.
Executor wiring and factory
tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py, tensorrt_llm/_torch/pyexecutor/py_executor.py
create_kv_cache_transceiver auto-selects Python transceiver when chunk_size_blocks is set, adds mismatch/threshold warnings. New _maybe_send_prefill_chunk helper dispatches pipelined chunks from _update_request_states_tp after each context chunk advance.
Tests
tests/unittest/disaggregated/test_chunked_transfer.py, tests/unittest/disaggregated/test_kv_transfer.py, tests/unittest/llmapi/test_llm_args.py, tests/integration/defs/accuracy/test_disaggregated_serving.py, tests/integration/test_lists/test-db/l0_dgx_b200.yml
New test_chunked_transfer.py covers KVSendTask, multi-slice TxSession/RxSession state machines, pipelined config gating, and respond_and_send_async skip logic. Extended test_kv_transfer.py adds _create_kv_slices correctness tests and chunked/pipelined end-to-end workers. test_llm_args.py validates chunk_size_blocks. Integration test and CI YAML add GSM8K accuracy runs for chunked NIXL transfer.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description is mostly a brief summary and TODO; it omits the required Description, Test Coverage, and PR Checklist sections. Rewrite it using the template, add a clear problem/solution summary, list tests, and complete the PR checklist.
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title matches the main change and follows the required [ticket][feat] format.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch trtllm-12499-pipelined-kvcache-transfer

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tensorrt_llm/_torch/disaggregation/native/transfer.py (1)

488-507: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Mark the task as in-flight before waiting on the CUDA event.

Line 492 waits while the task is still INIT, so cancel_request() can see no TRANSFERRING tasks and free KV pages before the event completes. Move the INIT→TRANSFERRING transition before the event wait, and keep the cancelled/error abort path before synchronization.

Suggested fix
-        # For pipelined prefill-transfer: wait for the GPU forward
-        # to finish writing KV data before starting RDMA.  This
-        # blocks only this worker thread, not the GPU or main thread.
-        if task._slice.cuda_event is not None: # TODO: should I sync after the task status is set to TRANSFERRING?
-            task._slice.cuda_event.synchronize()
-
-        if timer:
-            timer.record_push_end(write_meta.peer_rank)
         # Hold session.lock to serialize the INIT→TRANSFERRING transition with
         # cancel(): prevents cancel_request() from freeing KV pages while a
         # worker is about to write into them.
         with session.lock:
             status = session.status
             if status in (SessionStatus.ERROR, SessionStatus.CANCELLED):
                 should_abort = True
             else:
                 task.status = TaskStatus.TRANSFERRING
                 should_abort = False
+
+        if should_abort:
+            ...
+            return
+
+        # For pipelined prefill-transfer: wait for the GPU forward
+        # to finish writing KV data before starting RDMA.
+        if task._slice.cuda_event is not None:
+            task._slice.cuda_event.synchronize()
+
+        if timer:
+            timer.record_push_end(write_meta.peer_rank)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 488 -
507, The task transition in transfer.py is happening too late in the
prefill-transfer flow: `task._slice.cuda_event.synchronize()` runs while the
task is still `INIT`, so `cancel_request()` can miss it and free KV pages too
early. In the transfer path around `task`, `session.lock`, and
`TaskStatus.TRANSFERRING`, move the INIT→TRANSFERRING state update (with the
session ERROR/CANCELLED abort check) before waiting on the CUDA event, and keep
the abort branch ahead of synchronization so in-flight work is visible before
any blocking wait.
🧹 Nitpick comments (2)
tensorrt_llm/_torch/disaggregation/transceiver.py (2)

582-585: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value

Redundant session assignment.

_get_or_create_send_session already inserts the session into self._send_sessions, so re-assigning the return value is redundant (and could mask a future divergence between the two code paths). Mirror the simpler form used in respond_and_send_async.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 582 - 585,
The send-session initialization in transceiver logic has a redundant assignment
because _get_or_create_send_session already stores the session in
self._send_sessions. Update the rid-not-in-self._send_sessions branch in
transceiver.py to follow the same pattern as respond_and_send_async by simply
invoking _get_or_create_send_session(req) for its side effects, then keep
setting _ever_had_send_session and _pipelined_chunk_offsets[rid] as before.

602-604: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Numerous open TODOs in the pipelined-send path before merge.

send_prefill_chunk and respond_and_send_async carry several unresolved TODO(athenac) questions on correctness-critical fields (token_range, mamba_state_index, layer_range, the req.state transition, the offset accumulation "might be a faulty calculation", and the redundancy between the two methods). Since the PR is marked WIP, these need resolution before this is production-ready. I can help draft the offset/metadata handling and consolidate the shared logic into a single helper.

Also applies to: 608-611, 628-631, 657-666, 675-675

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 602 - 604,
The pipelined-send path in transceiver.py still contains unresolved correctness
TODOs in send_prefill_chunk and respond_and_send_async, especially around
token_range, mamba_state_index, layer_range, req.state transitions, and the
offset accumulation logic. Resolve these TODO(athenac) questions by verifying
the metadata semantics, fixing the offset calculation, and making the state
update explicit and correct before merge. Also remove the duplicated logic
between send_prefill_chunk and respond_and_send_async by consolidating the
shared send/metadata assembly into a single helper so the two paths stay
consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 728-745: The chunked destination slicing in transfer logic is now
using chunk offsets, but the token alignment still assumes each chunk maps to a
suffix ending at token_range.end. Update the code around the chunked path in
transfer.py and the downstream token-start calculation to derive starts from
chunk_block_offset, or require callers to provide per-chunk KVSlice.token_range
for each chunk. Make sure the block selection and token-range alignment stay
consistent for prefix-cache and SWA cases so the written blocks match the
intended chunk.
- Around line 523-524: The abort/result notification path in transfer.py still
uses write_meta.slice_id, which can conflict with the receiver’s single-task
slice handling. Update the abort send logic in the relevant transfer routine to
mirror the success path by reporting receiver_slice_id as 0 for aborts too, so
the receiver does not see a later-chunk slice ID and hit its slice assertion.
Keep the existing task/event unblocking behavior intact while ensuring the
aborted/failure result is always sent to receiver slice 0.
- Around line 736-743: The chunk-to-destination mapping in transfer.py is too
strict for exhausted layer groups: when len(src_block_ids) is 0, the current
bounds check in the chunk slicing logic still raises on advanced chunk_offset
values. Update the chunk handling around the dst_block_ids slice so empty source
chunks become a no-op and do not trigger the out-of-bounds error; keep the
existing bounds validation for non-empty chunks in the same chunk
offset/full_dst_block_ids path.

In `@tensorrt_llm/_torch/disaggregation/transceiver.py`:
- Line 648: The `respond_and_send_async` skip guard in `transceiver.py` needs
both a lint fix and a logic check: move the `return` onto its own line to
satisfy E701, and verify the condition around `rid in self._send_sessions and
rid not in self._pipelined_chunk_offsets` correctly prevents duplicate sends
while pipelined chunks are still outstanding. If needed, adjust the guard so the
full `_create_kv_slices` resend path only runs when it is safe, using the
existing `_send_sessions` and `_pipelined_chunk_offsets` state to avoid
duplicate transfer.
- Around line 591-611: The chunking logic in send_prefill_chunk() and the
_pipelined_chunk_offsets update can split KV slices on token boundaries that are
not aligned to tokens_per_block, which causes the boundary block to be resent
and offsets to drift. Adjust the prefill chunk selection so every chunk boundary
lands on a KV block boundary (or clamp the sliding-window fallback so it only
overlaps when it evenly divides tokens_per_block), and then recompute
_pipelined_chunk_offsets from the actual block count in the chunk.

In `@tests/unittest/disaggregated/test_kv_transfer.py`:
- Around line 1789-1825: The send/receive flow is using the wrong API shape:
TxSession.send() and RxSession.receive() should be called with a fully populated
KVSlice rather than extra kwargs, and they do not return futures. Update the
test setup around KVSlice, sender_session.send(), and
receiver_sessions/RxSession.receive() to set chunk_block_offset and cuda_event
on the slice object before calling send/receive, then replace the .result()
waits with wait_complete()/wait_complete(blocking=True) on the session or slice
as appropriate.

---

Outside diff comments:
In `@tensorrt_llm/_torch/disaggregation/native/transfer.py`:
- Around line 488-507: The task transition in transfer.py is happening too late
in the prefill-transfer flow: `task._slice.cuda_event.synchronize()` runs while
the task is still `INIT`, so `cancel_request()` can miss it and free KV pages
too early. In the transfer path around `task`, `session.lock`, and
`TaskStatus.TRANSFERRING`, move the INIT→TRANSFERRING state update (with the
session ERROR/CANCELLED abort check) before waiting on the CUDA event, and keep
the abort branch ahead of synchronization so in-flight work is visible before
any blocking wait.

---

Nitpick comments:
In `@tensorrt_llm/_torch/disaggregation/transceiver.py`:
- Around line 582-585: The send-session initialization in transceiver logic has
a redundant assignment because _get_or_create_send_session already stores the
session in self._send_sessions. Update the rid-not-in-self._send_sessions branch
in transceiver.py to follow the same pattern as respond_and_send_async by simply
invoking _get_or_create_send_session(req) for its side effects, then keep
setting _ever_had_send_session and _pipelined_chunk_offsets[rid] as before.
- Around line 602-604: The pipelined-send path in transceiver.py still contains
unresolved correctness TODOs in send_prefill_chunk and respond_and_send_async,
especially around token_range, mamba_state_index, layer_range, req.state
transitions, and the offset accumulation logic. Resolve these TODO(athenac)
questions by verifying the metadata semantics, fixing the offset calculation,
and making the state update explicit and correct before merge. Also remove the
duplicated logic between send_prefill_chunk and respond_and_send_async by
consolidating the shared send/metadata assembly into a single helper so the two
paths stay consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 784a62c4-609a-4545-bc83-7d70311b9078

📥 Commits

Reviewing files that changed from the base of the PR and between a3026c9 and 9aed65c.

📒 Files selected for processing (11)
  • tensorrt_llm/_torch/disaggregation/base/transfer.py
  • tensorrt_llm/_torch/disaggregation/native/transfer.py
  • tensorrt_llm/_torch/disaggregation/transceiver.py
  • tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py
  • tensorrt_llm/_torch/pyexecutor/py_executor.py
  • tensorrt_llm/llmapi/llm_args.py
  • tests/integration/defs/accuracy/test_disaggregated_serving.py
  • tests/integration/test_lists/test-db/l0_dgx_b200.yml
  • tests/unittest/disaggregated/test_chunked_transfer.py
  • tests/unittest/disaggregated/test_kv_transfer.py
  • tests/unittest/llmapi/test_llm_args.py

Comment on lines 523 to +524
str(write_meta.slice_id).encode("ascii"),
b"True", # is_last_slice — ensures receiver resolves its task future
b"True", # is_last_slice — ensures receiver resolves its task event

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Send abort results to receiver slice 0 as well.

The success path makes sender-side chunking transparent by reporting receiver_slice_id = 0; this abort path still sends write_meta.slice_id, so a cancelled/failed later chunk can trip the receiver’s single-task slice assertion instead of unblocking it.

Suggested fix
-                    str(write_meta.slice_id).encode("ascii"),
+                    b"0",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
str(write_meta.slice_id).encode("ascii"),
b"True", # is_last_slice — ensures receiver resolves its task future
b"True", # is_last_slice — ensures receiver resolves its task event
b"0",
b"True", # is_last_slice — ensures receiver resolves its task event
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 523 -
524, The abort/result notification path in transfer.py still uses
write_meta.slice_id, which can conflict with the receiver’s single-task slice
handling. Update the abort send logic in the relevant transfer routine to mirror
the success path by reporting receiver_slice_id as 0 for aborts too, so the
receiver does not see a later-chunk slice ID and hit its slice assertion. Keep
the existing task/event unblocking behavior intact while ensuring the
aborted/failure result is always sent to receiver slice 0.

Comment on lines +728 to +745
chunk_offset = task._slice.chunk_block_offset
for (self_lg, self_pi), (peer_lg, peer_pi) in pool_mapping.items():
src_block_ids = src_block_ids_per_groups[self_lg]
dst_block_ids = dst_block_ids_per_groups[peer_lg]
full_dst_block_ids = dst_block_ids_per_groups[peer_lg]

# When sender uses chunking, the receiver sends all dst
# blocks in a single RecvReqInfo. Slice dst to match
# this task's src chunk position.
if chunk_offset > 0 or not task._slice.is_last_slice:
chunk_end = chunk_offset + len(src_block_ids)
if chunk_end > full_dst_block_ids.size:
raise ValueError(
f"dst chunk range out of bounds: offset={chunk_offset}, "
f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}"
)
dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end]
else:
dst_block_ids = full_dst_block_ids

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Keep chunk token ranges consistent with chunk_block_offset.

This path now slices destination blocks by chunk, but the alignment below still infers token starts as if each chunk’s block list were the suffix ending at token_range.end. If chunked slices carry the full request range, prefix-cache/SWA cases can align and write the wrong blocks. Either require per-chunk KVSlice.token_range from callers or derive the token starts from chunk_block_offset.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 728 -
745, The chunked destination slicing in transfer logic is now using chunk
offsets, but the token alignment still assumes each chunk maps to a suffix
ending at token_range.end. Update the code around the chunked path in
transfer.py and the downstream token-start calculation to derive starts from
chunk_block_offset, or require callers to provide per-chunk KVSlice.token_range
for each chunk. Make sure the block selection and token-range alignment stay
consistent for prefix-cache and SWA cases so the written blocks match the
intended chunk.

Comment on lines +736 to +743
if chunk_offset > 0 or not task._slice.is_last_slice:
chunk_end = chunk_offset + len(src_block_ids)
if chunk_end > full_dst_block_ids.size:
raise ValueError(
f"dst chunk range out of bounds: offset={chunk_offset}, "
f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}"
)
dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Allow exhausted layer groups to map to empty destination slices.

With asymmetric layer-group lengths, later chunks can have len(src_block_ids) == 0 while chunk_offset has advanced past that group’s destination length. That should be a no-op, not an out-of-bounds error.

Suggested fix
                 if chunk_offset > 0 or not task._slice.is_last_slice:
+                    if len(src_block_ids) == 0:
+                        dst_block_ids = full_dst_block_ids[:0]
+                        continue
                     chunk_end = chunk_offset + len(src_block_ids)
                     if chunk_end > full_dst_block_ids.size:
                         raise ValueError(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if chunk_offset > 0 or not task._slice.is_last_slice:
chunk_end = chunk_offset + len(src_block_ids)
if chunk_end > full_dst_block_ids.size:
raise ValueError(
f"dst chunk range out of bounds: offset={chunk_offset}, "
f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}"
)
dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end]
if chunk_offset > 0 or not task._slice.is_last_slice:
if len(src_block_ids) == 0:
dst_block_ids = full_dst_block_ids[:0]
continue
chunk_end = chunk_offset + len(src_block_ids)
if chunk_end > full_dst_block_ids.size:
raise ValueError(
f"dst chunk range out of bounds: offset={chunk_offset}, "
f"len={len(src_block_ids)}, dst_blocks={full_dst_block_ids.size}"
)
dst_block_ids = full_dst_block_ids[chunk_offset:chunk_end]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/native/transfer.py` around lines 736 -
743, The chunk-to-destination mapping in transfer.py is too strict for exhausted
layer groups: when len(src_block_ids) is 0, the current bounds check in the
chunk slicing logic still raises on advanced chunk_offset values. Update the
chunk handling around the dst_block_ids slice so empty source chunks become a
no-op and do not trigger the out-of-bounds error; keep the existing bounds
validation for non-empty chunks in the same chunk offset/full_dst_block_ids
path.

Comment on lines +591 to +611
base_slice = self._collect_base_slice(req)
chunk_block_ids = [
block_ids[chunk_start_block:chunk_end_block]
for block_ids in base_slice.block_ids_per_layer_groups
]

kv_slice = KVSlice(
is_last_slice=is_last_chunk,
block_ids_per_layer_groups=chunk_block_ids,
chunk_block_offset=chunk_block_offset,
cuda_event=cuda_event,
token_range=base_slice.token_range, # TODO(athenac): is this correct? probably not. what is token range used for?
mamba_state_index=base_slice.mamba_state_index, # TODO(athenac): is this correct?
layer_range=base_slice.layer_range, # TODO(athenac): is this correct?
)

session.send(kv_slice)
req.state = LlmRequestState.DISAGG_CONTEXT_TRANS_IN_PROGRESS # TODO(athenac): is this correct?

num_blocks_this_chunk = max((len(ids) for ids in chunk_block_ids), default=0)
self._pipelined_chunk_offsets[rid] = chunk_block_offset + num_blocks_this_chunk # TODO(athenac): this might be a faulty calculation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect how context_chunk_size relates to tokens_per_block for chunked prefill
rg -nP -C3 '(context_chunk_size|chunk_size|tokens_per_block)' --type=py -g '!**/test*' tensorrt_llm/_torch/pyexecutor | rg -nP -C3 'tokens_per_block|chunk'

Repository: NVIDIA/TensorRT-LLM

Length of output: 50376


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== transceiver slice =="
sed -n '560,630p' tensorrt_llm/_torch/disaggregation/transceiver.py

echo
echo "== scheduler chunk sizing =="
sed -n '540,620p' tensorrt_llm/_torch/pyexecutor/scheduler/scheduler_v2.py

echo
echo "== alignment helper =="
sed -n '611,780p' tensorrt_llm/_torch/pyexecutor/scheduler/scheduler_v2.py

echo
echo "== search for context_chunk_size consumers =="
rg -n "context_chunk_size|is_last_context_chunk|chunk_unit_size|tokens_per_block" tensorrt_llm/_torch/pyexecutor tensorrt_llm/_torch/disaggregation -g '*.py'

Repository: NVIDIA/TensorRT-LLM

Length of output: 42122


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== transceiver callers =="
rg -n "chunk_start_block|chunk_end_block|send_kv|send_async|_send.*chunk|KVCacheTransceiverV2" tensorrt_llm/_torch -g '*.py' -C 3

echo
echo "== request_position / current_position handling around chunk scheduling =="
rg -n "context_current_position|context_remaining_length|context_chunk_size|is_last_context_chunk" tensorrt_llm/_torch/pyexecutor -g '*.py' -C 2 | sed -n '1,220p'

Repository: NVIDIA/TensorRT-LLM

Length of output: 38455


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== chunk_unit_size setup =="
sed -n '690,720p' tensorrt_llm/_torch/pyexecutor/py_executor_creator.py

echo
echo "== request chunk position bookkeeping =="
sed -n '4940,4985p' tensorrt_llm/_torch/pyexecutor/py_executor.py

echo
echo "== LlmRequest chunk helpers =="
rg -n "py_last_context_chunk|move_to_next_context_chunk|context_current_position" tensorrt_llm/_torch -g '*.py' -C 2

Repository: NVIDIA/TensorRT-LLM

Length of output: 25841


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== tokens_per_block defaults and overrides =="
rg -n "tokens_per_block *=|tokens_per_block: int|tokens_per_block=.*64|tokens_per_block=.*32|max\(256, chunk_unit_size\)|chunk_unit_size = max\(256, chunk_unit_size\)" tensorrt_llm/_torch -g '*.py' -C 2

echo
echo "== any validation on tokens_per_block divisibility or allowed values =="
rg -n "tokens_per_block.*(must|should|assert|raise|divis|power of two|64|32)" tensorrt_llm/_torch -g '*.py' -C 2

Repository: NVIDIA/TensorRT-LLM

Length of output: 50375


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== any validation / normalization of tokens_per_block =="
rg -n "tokens_per_block.*(32|64|128|256|power|validate|assert|must|allowed|divisible)" tensorrt_llm/_torch tensorrt_llm -g '*.py' -C 2

echo
echo "== kv cache config field docs =="
rg -n "tokens_per_block" tensorrt_llm/_torch -g '*.py' -C 1 | sed -n '1,120p'

Repository: NVIDIA/TensorRT-LLM

Length of output: 29457


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== explicit tokens_per_block assignments =="
rg -n "tokens_per_block\s*=\s*(32|64|128|256|512|1024)" tensorrt_llm -g '*.py' -C 1

echo
echo "== tokens_per_block in config schema / docs =="
rg -n "tokens_per_block" tensorrt_llm -g '*.py' -g '*.md' -C 1 | rg -n "default|32|64|128|256|power|multiple"

Repository: NVIDIA/TensorRT-LLM

Length of output: 17643


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== prepopulated prompt len / current position semantics =="
rg -n "set_prepopulated_prompt_len|prepopulated_prompt_len|num_committed_tokens" tensorrt_llm/_torch/pyexecutor -g '*.py' -C 2

Repository: NVIDIA/TensorRT-LLM

Length of output: 5346


Keep prefill chunk boundaries on KV block boundaries. send_prefill_chunk() slices by block index, so a boundary that isn’t a tokens_per_block multiple will resend the boundary block and skew _pipelined_chunk_offsets. The 256-token sliding-window fallback is only safe when it still divides tokens_per_block; otherwise clamp the chunk size or drop the overlap.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/transceiver.py` around lines 591 - 611,
The chunking logic in send_prefill_chunk() and the _pipelined_chunk_offsets
update can split KV slices on token boundaries that are not aligned to
tokens_per_block, which causes the boundary block to be resent and offsets to
drift. Adjust the prefill chunk selection so every chunk boundary lands on a KV
block boundary (or clamp the sliding-window fallback so it only overlaps when it
evenly divides tokens_per_block), and then recompute _pipelined_chunk_offsets
from the actual block count in the chunk.

assert rid is not None

# If pipelined transfer already sent all chunks, skip re-sending.
if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Move return to its own line (lint failure) and confirm the skip guard.

Static analysis flags E701 (multiple statements on one line). Beyond the formatting, note the guard rid in self._send_sessions and rid not in self._pipelined_chunk_offsets only skips re-send once the last pipelined chunk has run (which pops rid from _pipelined_chunk_offsets). If respond_and_send_async is ever reached while pipelined chunks are still in flight (rid still present), it falls through and re-sends the full slice set via _create_kv_slices, duplicating the transfer.

🐛 Proposed formatting fix
-        if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return
+        if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets:
+            return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets: return
if rid in self._send_sessions and rid not in self._pipelined_chunk_offsets:
return
🧰 Tools
🪛 Ruff (0.15.20)

[error] 648-648: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tensorrt_llm/_torch/disaggregation/transceiver.py` at line 648, The
`respond_and_send_async` skip guard in `transceiver.py` needs both a lint fix
and a logic check: move the `return` onto its own line to satisfy E701, and
verify the condition around `rid in self._send_sessions and rid not in
self._pipelined_chunk_offsets` correctly prevents duplicate sends while
pipelined chunks are still outstanding. If needed, adjust the guard so the full
`_create_kv_slices` resend path only runs when it is safe, using the existing
`_send_sessions` and `_pipelined_chunk_offsets` state to avoid duplicate
transfer.

Source: Linters/SAST tools

Comment on lines +1789 to +1825
for sender_session, block_ids_per_groups in zip(sender_sessions, ctx_block_ids):
max_blocks = max(len(ids) for ids in block_ids_per_groups)
num_chunks = math.ceil(max_blocks / chunk_size_blocks)
chunk_offset = 0
for chunk_idx in range(num_chunks):
start = chunk_idx * chunk_size_blocks
end = start + chunk_size_blocks
is_last = chunk_idx == num_chunks - 1
chunk_block_ids = [ids[start:end] for ids in block_ids_per_groups]
kv_slice = KVSlice(
is_last_slice=is_last,
block_ids_per_layer_groups=chunk_block_ids,
)
cuda_event = torch.cuda.Event()
cuda_event.record()
send_futures.append(
sender_session.send(
kv_slice, chunk_block_offset=chunk_offset, cuda_event=cuda_event
)
)
chunk_offset += max(len(ids) for ids in chunk_block_ids)

receiver_sessions = [
tw.create_rx_session(ctx_info["gen_request"]) for tw in gen_transfer_workers
]
recv_futures = []
for recv_session, block_ids_per_groups in zip(receiver_sessions, gen_block_ids):
full_slice = KVSlice(
is_last_slice=True,
block_ids_per_layer_groups=block_ids_per_groups,
)
recv_futures.append(recv_session.receive(full_slice))

for f in send_futures:
f.result()
for f in recv_futures:
f.result()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm send/receive signatures and return types.
rg -nP 'def send\(|def receive\(' \
  tensorrt_llm/_torch/disaggregation/native/transfer.py tensorrt_llm/_torch/disaggregation/base/transfer.py -A4

Repository: NVIDIA/TensorRT-LLM

Length of output: 1946


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the test helper around the flagged lines.
sed -n '1760,1845p' tests/unittest/disaggregated/test_kv_transfer.py

printf '\n--- KVSlice definition ---\n'
rg -n "class KVSlice|chunk_block_offset|cuda_event" tensorrt_llm/_torch/disaggregation -A6 -B4

printf '\n--- wait_complete usage in tests ---\n'
rg -n "wait_complete\(" tests/unittest/disaggregated/test_kv_transfer.py tensorrt_llm/_torch/disaggregation -A3 -B3

Repository: NVIDIA/TensorRT-LLM

Length of output: 26431


Build the KVSlice metadata before calling send()/receive()

TxSession.send()/RxSession.receive() take only the slice and return None, so passing chunk_block_offset/cuda_event as kwargs and then calling .result() on the return values will fail. Set those fields on KVSlice here, and synchronize with wait_complete()/wait_complete(blocking=True) instead.

🧰 Tools
🪛 Ruff (0.15.20)

[warning] 1789-1789: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)


[warning] 1815-1815: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unittest/disaggregated/test_kv_transfer.py` around lines 1789 - 1825,
The send/receive flow is using the wrong API shape: TxSession.send() and
RxSession.receive() should be called with a fully populated KVSlice rather than
extra kwargs, and they do not return futures. Update the test setup around
KVSlice, sender_session.send(), and receiver_sessions/RxSession.receive() to set
chunk_block_offset and cuda_event on the slice object before calling
send/receive, then replace the .result() waits with
wait_complete()/wait_complete(blocking=True) on the session or slice as
appropriate.

Synchronization on KVSlice::cuda_event for pipelined KV block transfer is redundant because PyExecutor::_forward_step() already does this.

Signed-off-by: Athena Cai <athenac@nvidia.com>
@xinhe-nv

Copy link
Copy Markdown
Collaborator

@coderabbitai check the test coverage, including function test, perf test, stress test and multi nodes test.

@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Changed/mentioned test files =="
for f in \
  tests/unittest/disaggregated/test_chunked_transfer.py \
  tests/unittest/disaggregated/test_kv_transfer.py \
  tests/unittest/llmapi/test_llm_args.py \
  tests/integration/defs/accuracy/test_disaggregated_serving.py \
  tests/integration/test_lists/test-db/l0_dgx_b200.yml \
  tensorrt_llm/_torch/disaggregation/transceiver.py \
  tensorrt_llm/_torch/pyexecutor/py_executor.py \
  tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py
do
  if [ -f "$f" ]; then
    echo "-- outline: $f"
    ast-grep outline "$f" --view expanded || true
    echo
  else
    echo "Missing: $f"
  fi
done

echo "== Search for perf/stress/multi-node/disaggregated related tests =="
rg -n -i -C2 \
  '(perf|performance|benchmark|stress|soak|multi[-_ ]node|multinode|disaggregated serving|disaggregated|chunked|pipelined)' \
  tests tensorrt_llm/_torch | sed -n '1,260p'

echo
echo "== Inspect relevant slices from changed tests =="
sed -n '1,260p' tests/unittest/disaggregated/test_chunked_transfer.py
echo "-----"
sed -n '1,260p' tests/unittest/disaggregated/test_kv_transfer.py
echo "-----"
sed -n '1,220p' tests/integration/defs/accuracy/test_disaggregated_serving.py
echo "-----"
sed -n '1,220p' tests/unittest/llmapi/test_llm_args.py
echo "-----"
sed -n '1,220p' tests/integration/test_lists/test-db/l0_dgx_b200.yml

echo
echo "== Inspect relevant implementation slices for testability points =="
sed -n '1,260p' tensorrt_llm/_torch/disaggregation/transceiver.py
echo "-----"
sed -n '1,220p' tensorrt_llm/_torch/pyexecutor/py_executor.py
echo "-----"
sed -n '1,220p' tensorrt_llm/_torch/pyexecutor/kv_cache_transceiver.py

Length of output: 144072


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Direct test references for new pipelined/chunked entry points =="
rg -n -C2 \
  '_maybe_send_prefill_chunk|send_prefill_chunk|enable_pipelined_transfer|respond_and_send_async|request_and_receive_async|_create_kv_slices|create_kv_cache_transceiver' \
  tests tensorrt_llm/_torch | sed -n '1,260p'

echo
echo "== Disaggregated multi-node / stress / soak style tests in test tree =="
rg -n -i -C2 \
  '(multi[-_ ]node|multinode|soak|stress|long[-_ ]run|longevity|burn[-_ ]in)' \
  tests | sed -n '1,220p'

echo
echo "== Show pipelined accuracy test and nearby YAML entries =="
sed -n '720,820p' tests/integration/defs/accuracy/test_disaggregated_serving.py
echo "-----"
rg -n -C2 'chunked_kv_transfer|pipelined_kv_transfer' tests/integration/test_lists/test-db/l0_dgx_b200.yml

Length of output: 49120


@xinhe-nv I checked the coverage. Short version: function coverage is decent, but perf/stress/multi-node coverage for this new feature is still incomplete.

What is covered well

Function / unit tests

  • tests/unittest/disaggregated/test_chunked_transfer.py
    • Covers KVSendTask, TxSession, RxSession
    • Covers multi-slice status/error handling
    • Covers enable_pipelined_transfer
    • Covers the “skip re-send” branch in respond_and_send_async
  • tests/unittest/disaggregated/test_kv_transfer.py
    • Covers _create_kv_slices
    • Covers chunk integrity across layer groups
    • Covers mamba_state_index propagation
    • Adds end-to-end chunked and pipelined transfer tests

Integration / accuracy

  • tests/integration/defs/accuracy/test_disaggregated_serving.py
    • test_chunked_kv_transfer_nixl_python_accuracy at Line 744
    • test_pipelined_kv_transfer_nixl_python_accuracy at Line 787

Gaps I found

1. Some new entry points are not directly tested
I did not find direct tests for:

  • tensorrt_llm._torch.disaggregation.transceiver.KvCacheTransceiverV2.send_prefill_chunk
  • tensorrt_llm._torch.pyexecutor.py_executor.PyExecutor._maybe_send_prefill_chunk
  • tensorrt_llm._torch.pyexecutor.kv_cache_transceiver.create_kv_cache_transceiver
  • tensorrt_llm._torch.disaggregation.transceiver.KvCacheTransceiverV2.request_and_receive_async

So the core mechanics are exercised indirectly, but several newly added integration points are still missing targeted tests.

2. Pipelined accuracy test does not appear to be scheduled in the shown pre-merge list

  • tests/integration/test_lists/test-db/l0_dgx_b200.yml includes the chunked accuracy test at Line 20 and Line 21
  • I did not find the new test_pipelined_kv_transfer_nixl_python_accuracy in that YAML

So pipelined accuracy exists in code, but it does not look wired into this pre-merge list yet.

3. No PR-specific perf validation
I found general perf infrastructure in the repo, but nothing added here that validates the PR goal:

  • no latency assertion
  • no overlap/hidden-transfer metric
  • no benchmark comparing pipelined vs non-pipelined transfer

Given the PR objective is explicitly performance-oriented, this is a notable gap.

4. No PR-specific stress coverage
The repo has general stress suites, including disaggregated stress entries in:

  • tests/integration/test_lists/qa/llm_function_stress.txt

But I did not find stress coverage specifically targeting:

  • chunked KV transfer
  • pipelined KV transfer
  • repeated chunk-send / cancel / failure / long-run stability for this path

5. No PR-specific multi-node coverage
The repo does have multi-node disaggregated tests, e.g.:

  • tests/unittest/llmapi/apps/_test_disagg_serving_multi_nodes.py
  • tests/unittest/llmapi/apps/_test_disagg_serving_multi_nodes_service_discovery.py
  • multi-node perf YAMLs under tests/integration/test_lists/test-db/

But I did not find multi-node tests specifically exercising this PR’s new chunked/pipelined KV transfer path.

Overall assessment

  • Function test coverage: good
  • Accuracy coverage: present for both chunked and pipelined
  • Pre-merge coverage: incomplete, because pipelined accuracy does not appear scheduled
  • Perf coverage: missing for this feature
  • Stress coverage: missing for this feature
  • Multi-node coverage: missing for this feature

Recommended follow-ups

  1. Add direct unit tests for:
    • send_prefill_chunk
    • _maybe_send_prefill_chunk
    • create_kv_cache_transceiver auto-selection / warnings
    • request_and_receive_async
  2. Add the pipelined accuracy test to the appropriate test-db YAML.
  3. Add at least one perf sanity test for:
    • pipelined vs non-pipelined transfer
    • visible transfer latency / overlap benefit
  4. Add a stress case for repeated chunked/pipelined disaggregated serving.
  5. Add a multi-node disaggregated case specifically for pipelined KV transfer.

If you want, I can turn this into a concrete review comment with prioritized test additions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants